The concept of writing data back to the source system is fundamentally same as the reading data. You need a writer, and hence we have a DataFrameWriter interface. For a typical writer, there are four methods.
- format
- option
- mode
- save
How to handle file already exist
However, while working with a file-based destination, you may encounter a file already exists situation. To handle that scenario, we have a mode method that takes one of the following options.
- append
- overwrite
- errorIfExists
- ignore
Additional Spark Data Frame Write options
if you are working with file-based destination system, you also have three additional methods.
- partitionBy
- bucketBy
- sortBy
The partitionBy method allows you to partition your output file. And the bucketBy method allows you to create a finite number of data buckets. Both methods follow the hive partitioning and bucketing principals.
Write Data In Hive Partation Table
var dbName="your database name"
var finaltable="your table name"
// First check if table is available or not..
if (spark.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
//If table is not available then it will create for you..
println("Table Not Present \n Creating table " + finaltable)
spark.sql("use Database_Name")
spark.sql("SET hive.exec.dynamic.partition = true")
spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
spark.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
spark.sql("create table " + dbName +"." + finaltable + "(
EMP_ID string,
EMP_Name string,
EMP_Address string,
EMP_Salary bigint) PARTITIONED BY (EMP_DEP STRING)")
//Table is created now insert the DataFrame in append Mode
df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
Different way write the data into different file format
Read CSV -> Write Parquet
Read Parquet -> Write JSON
Read JSON -> Write ORC
Read ORC -> Write XML
Read XML -> Write AVRO
Read AVRO -> Write CSV
//Read CSV into Data Frame
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("mode", "failfast")
.load("/spark-data/mental-health-in-tech-survey/survey.csv")
//Write Data Frame to Parquet
df.write
.format("parquet")
.mode("overwrite")
.save("/spark-data/mental-health-in-tech-survey/parquet-data/")
//Read Parquet into Data Frame
val df = spark.read
.format("parquet")
.option("mode", "failfast")
.load("/spark-data/mental-health-in-tech-survey/parquet-data/")
//Write Data Frame to JSON
df.write
.format("json")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.mode("overwrite")
.save("/spark-data/mental-health-in-tech-survey/json-data/")
//Read JSON into Data Frame
val df = spark.read
.format("json")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.option("mode", "failfast")
.load("/home/prashant/spark-data/mental-health-in-tech-survey/json-data/")
//Write Data Frame to ORC
df.write
.format("orc")
.mode("overwrite")
.save("/home/prashant/spark-data/mental-health-in-tech-survey/orc-data/")
spark-shell --packages com.databricks:spark-xml_2.11:0.4.1,com.databricks:spark-avro_2.11:4.0.0
//Read ORC into Data Frame
val df = spark.read
.format("orc")
.option("mode", "failfast")
.load("/home/prashant/spark-data/mental-health-in-tech-survey/orc-data/")
//Write Data Frame to XML
df.write
.format("com.databricks.spark.xml")
.option("rootTag", "survey")
.option("rowTag", "survey-row")
.mode("overwrite")
.save("/home/prashant/spark-data/mental-health-in-tech-survey/xml-data/")
//Read XML into Data Frame
val df = spark.read
.format("com.databricks.spark.xml")
.option("rowTag", "survey-row")
.option("mode", "failfast")
.load("/home/prashant/spark-data/mental-health-in-tech-survey/xml-data/")
//Write Data Frame to AVRO
df.write
.format("com.databricks.spark.avro")
.mode("overwrite")
.save("/home/prashant/spark-data/mental-health-in-tech-survey/avro-data/")
No comments:
Post a Comment